/*
 * Copyright 1999-2018 Alibaba Group Holding Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.csp.sentinel.dashboard.service;

import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import com.alibaba.csp.sentinel.cluster.ClusterStateManager;
import com.alibaba.csp.sentinel.dashboard.domain.cluster.state.ClusterUniversalStatePairVO;
import com.alibaba.csp.sentinel.util.AssertUtil;
import com.alibaba.csp.sentinel.util.function.Tuple2;

import com.alibaba.csp.sentinel.dashboard.client.SentinelApiClient;
import com.alibaba.csp.sentinel.dashboard.domain.cluster.ClusterAppAssignResultVO;
import com.alibaba.csp.sentinel.dashboard.domain.cluster.ClusterGroupEntity;
import com.alibaba.csp.sentinel.dashboard.domain.cluster.config.ClusterClientConfig;
import com.alibaba.csp.sentinel.dashboard.domain.cluster.config.ServerFlowConfig;
import com.alibaba.csp.sentinel.dashboard.domain.cluster.config.ServerTransportConfig;
import com.alibaba.csp.sentinel.dashboard.domain.cluster.request.ClusterAppAssignMap;
import com.alibaba.csp.sentinel.dashboard.util.MachineUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @author Eric Zhao
 * @since 1.4.1
 */
@Service
public class ClusterAssignServiceImpl implements ClusterAssignService {

	private final Logger LOGGER = LoggerFactory.getLogger(ClusterAssignServiceImpl.class);

	@Autowired
	private SentinelApiClient sentinelApiClient;

	@Autowired
	private ClusterConfigService clusterConfigService;

	private boolean isMachineInApp(/* @NonEmpty */ String machineId) {
		return machineId.contains(":");
	}

	private ClusterAppAssignResultVO handleUnbindClusterServerNotInApp(String app, String machineId) {
		Set<String> failedSet = new HashSet<>();
		try {
			List<ClusterUniversalStatePairVO> list = clusterConfigService.getClusterUniversalState(app).get(10,
					TimeUnit.SECONDS);
			Set<String> toModifySet = list.stream()
					.filter(e -> e.getState().getStateInfo().getMode() == ClusterStateManager.CLUSTER_CLIENT)
					.filter(e -> machineId.equals(e.getState().getClient().getClientConfig().getServerHost() + ':'
							+ e.getState().getClient().getClientConfig().getServerPort()))
					.map(e -> e.getIp() + '@' + e.getCommandPort()).collect(Collectors.toSet());
			// Modify mode to NOT-STARTED for all associated token clients.
			modifyToNonStarted(toModifySet, failedSet);
		}
		catch (Exception ex) {
			Throwable e = ex instanceof ExecutionException ? ex.getCause() : ex;
			LOGGER.error("Failed to unbind machine <{}>", machineId, e);
			failedSet.add(machineId);
		}
		return new ClusterAppAssignResultVO().setFailedClientSet(failedSet).setFailedServerSet(new HashSet<>());
	}

	private void modifyToNonStarted(Set<String> toModifySet, Set<String> failedSet) {
		toModifySet.parallelStream().map(MachineUtils::parseCommandIpAndPort).filter(Optional::isPresent)
				.map(Optional::get).map(e -> {
					CompletableFuture<Void> f = modifyMode(e.r1, e.r2, ClusterStateManager.CLUSTER_NOT_STARTED);
					return Tuple2.of(e.r1 + '@' + e.r2, f);
				}).forEach(f -> handleFutureSync(f, failedSet));
	}

	@Override
	public ClusterAppAssignResultVO unbindClusterServer(String app, String machineId) {
		AssertUtil.assertNotBlank(app, "app cannot be blank");
		AssertUtil.assertNotBlank(machineId, "machineId cannot be blank");

		if (isMachineInApp(machineId)) {
			return handleUnbindClusterServerNotInApp(app, machineId);
		}
		Set<String> failedSet = new HashSet<>();
		try {
			ClusterGroupEntity entity = clusterConfigService.getClusterUniversalStateForAppMachine(app, machineId)
					.get(10, TimeUnit.SECONDS);
			Set<String> toModifySet = new HashSet<>();
			toModifySet.add(machineId);
			if (entity.getClientSet() != null) {
				toModifySet.addAll(entity.getClientSet());
			}
			// Modify mode to NOT-STARTED for all chosen token servers and associated
			// token clients.
			modifyToNonStarted(toModifySet, failedSet);
		}
		catch (Exception ex) {
			Throwable e = ex instanceof ExecutionException ? ex.getCause() : ex;
			LOGGER.error("Failed to unbind machine <{}>", machineId, e);
			failedSet.add(machineId);
		}
		return new ClusterAppAssignResultVO().setFailedClientSet(failedSet).setFailedServerSet(new HashSet<>());
	}

	@Override
	public ClusterAppAssignResultVO unbindClusterServers(String app, Set<String> machineIdSet) {
		AssertUtil.assertNotBlank(app, "app cannot be blank");
		AssertUtil.isTrue(machineIdSet != null && !machineIdSet.isEmpty(), "machineIdSet cannot be empty");
		ClusterAppAssignResultVO result = new ClusterAppAssignResultVO().setFailedClientSet(new HashSet<>())
				.setFailedServerSet(new HashSet<>());
		for (String machineId : machineIdSet) {
			ClusterAppAssignResultVO resultVO = unbindClusterServer(app, machineId);
			result.getFailedClientSet().addAll(resultVO.getFailedClientSet());
			result.getFailedServerSet().addAll(resultVO.getFailedServerSet());
		}
		return result;
	}

	@Override
	public ClusterAppAssignResultVO applyAssignToApp(String app, List<ClusterAppAssignMap> clusterMap,
			Set<String> remainingSet) {
		AssertUtil.assertNotBlank(app, "app cannot be blank");
		AssertUtil.notNull(clusterMap, "clusterMap cannot be null");
		Set<String> failedServerSet = new HashSet<>();
		Set<String> failedClientSet = new HashSet<>();

		// Assign server and apply config.
		clusterMap.stream().filter(Objects::nonNull).filter(ClusterAppAssignMap::getBelongToApp).map(e -> {
			String ip = e.getIp();
			int commandPort = parsePort(e);
			CompletableFuture<Void> f = modifyMode(ip, commandPort, ClusterStateManager.CLUSTER_SERVER)
					.thenCompose(v -> applyServerConfigChange(app, ip, commandPort, e));
			return Tuple2.of(e.getMachineId(), f);
		}).forEach(t -> handleFutureSync(t, failedServerSet));

		// Assign client of servers and apply config.
		clusterMap.parallelStream().filter(Objects::nonNull)
				.forEach(e -> applyAllClientConfigChange(app, e, failedClientSet));

		// Unbind remaining (unassigned) machines.
		applyAllRemainingMachineSet(app, remainingSet, failedClientSet);

		return new ClusterAppAssignResultVO().setFailedClientSet(failedClientSet).setFailedServerSet(failedServerSet);
	}

	private void applyAllRemainingMachineSet(String app, Set<String> remainingSet, Set<String> failedSet) {
		if (remainingSet == null || remainingSet.isEmpty()) {
			return;
		}
		remainingSet.parallelStream().filter(Objects::nonNull).map(MachineUtils::parseCommandIpAndPort)
				.filter(Optional::isPresent).map(Optional::get).map(ipPort -> {
					String ip = ipPort.r1;
					int commandPort = ipPort.r2;
					CompletableFuture<Void> f = modifyMode(ip, commandPort, ClusterStateManager.CLUSTER_NOT_STARTED);
					return Tuple2.of(ip + '@' + commandPort, f);
				}).forEach(t -> handleFutureSync(t, failedSet));
	}

	private void applyAllClientConfigChange(String app, ClusterAppAssignMap assignMap, Set<String> failedSet) {
		Set<String> clientSet = assignMap.getClientSet();
		if (clientSet == null || clientSet.isEmpty()) {
			return;
		}
		final String serverIp = assignMap.getIp();
		final int serverPort = assignMap.getPort();
		clientSet.stream().map(MachineUtils::parseCommandIpAndPort).filter(Optional::isPresent).map(Optional::get)
				.map(ipPort -> {
					CompletableFuture<Void> f = sentinelApiClient
							.modifyClusterMode(ipPort.r1, ipPort.r2, ClusterStateManager.CLUSTER_CLIENT)
							.thenCompose(v -> sentinelApiClient.modifyClusterClientConfig(app, ipPort.r1, ipPort.r2,
									new ClusterClientConfig().setRequestTimeout(20).setServerHost(serverIp)
											.setServerPort(serverPort)));
					return Tuple2.of(ipPort.r1 + '@' + ipPort.r2, f);
				}).forEach(t -> handleFutureSync(t, failedSet));
	}

	private void handleFutureSync(Tuple2<String, CompletableFuture<Void>> t, Set<String> failedSet) {
		try {
			t.r2.get(10, TimeUnit.SECONDS);
		}
		catch (Exception ex) {
			if (ex instanceof ExecutionException) {
				LOGGER.error("Request for <{}> failed", t.r1, ex.getCause());
			}
			else {
				LOGGER.error("Request for <{}> failed", t.r1, ex);
			}
			failedSet.add(t.r1);
		}
	}

	private CompletableFuture<Void> applyServerConfigChange(String app, String ip, int commandPort,
			ClusterAppAssignMap assignMap) {
		ServerTransportConfig transportConfig = new ServerTransportConfig().setPort(assignMap.getPort())
				.setIdleSeconds(600);
		return sentinelApiClient.modifyClusterServerTransportConfig(app, ip, commandPort, transportConfig)
				.thenCompose(v -> applyServerFlowConfigChange(app, ip, commandPort, assignMap))
				.thenCompose(v -> applyServerNamespaceSetConfig(app, ip, commandPort, assignMap));
	}

	private CompletableFuture<Void> applyServerFlowConfigChange(String app, String ip, int commandPort,
			ClusterAppAssignMap assignMap) {
		Double maxAllowedQps = assignMap.getMaxAllowedQps();
		if (maxAllowedQps == null || maxAllowedQps <= 0 || maxAllowedQps > 20_0000) {
			return CompletableFuture.completedFuture(null);
		}
		return sentinelApiClient.modifyClusterServerFlowConfig(app, ip, commandPort,
				new ServerFlowConfig().setMaxAllowedQps(maxAllowedQps));
	}

	private CompletableFuture<Void> applyServerNamespaceSetConfig(String app, String ip, int commandPort,
			ClusterAppAssignMap assignMap) {
		Set<String> namespaceSet = assignMap.getNamespaceSet();
		if (namespaceSet == null || namespaceSet.isEmpty()) {
			return CompletableFuture.completedFuture(null);
		}
		return sentinelApiClient.modifyClusterServerNamespaceSet(app, ip, commandPort, namespaceSet);
	}

	private CompletableFuture<Void> modifyMode(String ip, int port, int mode) {
		return sentinelApiClient.modifyClusterMode(ip, port, mode);
	}

	private int parsePort(ClusterAppAssignMap assignMap) {
		return MachineUtils.parseCommandPort(assignMap.getMachineId()).orElse(ServerTransportConfig.DEFAULT_PORT);
	}

}
